Skip to content

Conversation

@vgvoleg
Copy link
Collaborator

@vgvoleg vgvoleg commented Aug 21, 2024

Pull request type

Please check the type of change your PR introduces:

  • Bugfix
  • Feature
  • Code style update (formatting, renaming)
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • Documentation content changes
  • Other (please describe):

What is the current behavior?

Issue Number: N/A

What is the new behavior?

Other information

@vgvoleg vgvoleg marked this pull request as draft August 21, 2024 14:52
@github-actions
Copy link

github-actions bot commented Aug 21, 2024

🌋 Here are results of SLO test for Python SDK over Table Service:

Grafana Dashboard

SLO-sync-python-table

@github-actions
Copy link

github-actions bot commented Aug 21, 2024

🌋 Here are results of SLO test for Python SDK over Query Service:

Grafana Dashboard

SLO-sync-python-query

@vgvoleg
Copy link
Collaborator Author

vgvoleg commented Aug 27, 2024

image

@vgvoleg vgvoleg marked this pull request as ready for review August 27, 2024 10:00
@vgvoleg vgvoleg requested a review from rekby August 27, 2024 10:00
pool._size = target_size
ids = set()

for i in range(1, target_size + 1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not

for i in range (target_size)

?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for better readability here

assert pool._current_size == i

I think it does not matter where to drop a +1 so it was a coin flip

ids.add(session._state.session_id)

with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(pool.acquire(), timeout=0.5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be whait 0.1 or 0.01 second?

await pool.release(session)

session = await pool.acquire()
assert pool._current_size == target_size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about save session id before release and compare aquired session id with exactly saved id instead of one of?

docker_project.stop()
try:
await asyncio.wait_for(pool.acquire(), timeout=0.5)
except ydb.Error:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ydb.Error instead of TimeoutError?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we will have an error from driver after ydb scale down

done, _ = await asyncio.wait((queue_get, task_stop), return_when=asyncio.FIRST_COMPLETED)
if task_stop in done:
queue_get.cancel()
return await self._create_new_session() # TODO: not sure why
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What mean the comment?


await asyncio.gather(*tasks)

logger.debug("All session were deleted.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about add __del__ for stop the pool? may be hard stop: without wait anything and with a log message?

fot prevent leaks


class SimpleQuerySessionCheckoutAsync:
def __init__(self, pool: QuerySessionPoolAsync):
def __init__(self, pool: QuerySessionPoolAsync, timeout: Optional[float] = None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need timeout in async?

start = time.monotonic()
if session is None and self._current_size == self._size:
try:
_, session = self._queue.get(block=True, timeout=timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is long operation under mutex lock.

How you will enforce smaller timeout on parallel request?

@vgvoleg vgvoleg requested a review from rekby September 4, 2024 15:36
done, _ = await asyncio.wait((queue_get, task_stop), return_when=asyncio.FIRST_COMPLETED)
if task_stop in done:
queue_get.cancel()
return await self._create_new_session()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it create a session on stop pool?
may be runtime error - similar to acquire on stopped pool?

if self._should_stop.is_set() or self._loop.is_closed():
return

self._loop.call_soon(functools.partial(self.stop))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need "partial"?

@vgvoleg vgvoleg merged commit 1a578f1 into main Sep 9, 2024
11 checks passed
@vgvoleg vgvoleg deleted the new_session_pool branch September 9, 2024 09:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants